Skip to content

Conversation

@vinitkumar
Copy link
Owner

@vinitkumar vinitkumar commented Oct 23, 2025

Summary

This PR adds parallel processing support for json2xml, leveraging Python 3.14t's free-threaded capabilities (no-GIL) to achieve up to 1.55x speedup for medium-sized datasets.

🚀 Key Features

  • Parallel processing for dictionaries and lists
  • Free-threaded Python 3.14t support with GIL-free execution
  • Up to 1.55x speedup for medium datasets (100-1K items)
  • Automatic fallback to serial processing for small datasets
  • Thread-safe XML validation caching
  • Zero breaking changes - fully backward compatible

📊 Benchmark Results

Tested on macOS ARM64 with Python 3.14.0 and Python 3.14.0t:

Medium Dataset (100 items) - Best Case

Python Version Serial Time Parallel (4w) Speedup
3.14 (GIL) 7.56 ms 7.86 ms 0.96x
3.14t (no-GIL) 8.59 ms 5.55 ms 1.55x 🚀

Key Findings:

  • 1.55x speedup on Python 3.14t for medium datasets
  • ✅ Automatic detection of free-threaded Python build
  • ✅ No benefit on standard Python (as expected due to GIL)
  • ✅ Smart fallback avoids overhead for small datasets

See BENCHMARK_RESULTS.md for complete results.

💻 Usage

Basic Parallel Processing

from json2xml.json2xml import Json2xml

data = {"users": [{"id": i, "name": f"User {i}"} for i in range(1000)]}
converter = Json2xml(data, parallel=True)
xml = converter.to_xml()  # Up to 1.55x faster on Python 3.14t!

Advanced Configuration

converter = Json2xml(
    data,
    parallel=True,    # Enable parallel processing
    workers=4,        # Use 4 worker threads
    chunk_size=100    # Process 100 items per chunk
)
xml = converter.to_xml()

🔧 Implementation Details

New Files

  • json2xml/parallel.py - Parallel processing module (318 lines)
  • tests/test_parallel.py - Comprehensive test suite (20 tests)
  • benchmark.py - Performance benchmarking tool
  • docs/performance.rst - Sphinx documentation

Modified Files

  • json2xml/json2xml.py - Added parallel, workers, chunk_size parameters
  • json2xml/dicttoxml.py - Integrated parallel processing support
  • README.rst - Added performance section with benchmarks
  • docs/index.rst - Added performance documentation page

✅ Testing

All 173 tests passing (153 original + 20 new parallel tests)

pytest -v
# ============================= 173 passed in 0.14s ==============================
  • ✅ Zero regressions
  • ✅ Full backward compatibility
  • ✅ Comprehensive parallel processing validation
  • ✅ Edge case handling
  • ✅ Thread-safety verification

📚 Documentation

Complete documentation added:

  • FREE_THREADED_OPTIMIZATION_ANALYSIS.md - Technical analysis and optimization strategy
  • BENCHMARK_RESULTS.md - Detailed benchmark results and analysis
  • IMPLEMENTATION_SUMMARY.md - Implementation details and architecture
  • docs/performance.rst - Sphinx documentation for users
  • Updated README.rst with usage examples and benchmark results

🎯 Performance Recommendations

When to Use Parallel Processing

Best for:

  • Medium datasets (100-1K items)
  • Python 3.14t (free-threaded build)
  • Complex nested structures

Not recommended for:

  • Small datasets (< 100 items) - overhead outweighs benefit
  • Standard Python with GIL - no parallel execution possible

Optimal Configuration

# For medium datasets (100-1K items)
converter = Json2xml(data, parallel=True, workers=4)

🔄 Breaking Changes

None - This is a fully backward-compatible change:

  • Default behavior unchanged (parallel=False)
  • All existing code continues to work without modification
  • Parallel processing is opt-in

🧪 Running Benchmarks

# Standard Python
uv run --python 3.14 python benchmark.py

# Free-threaded Python
uv run --python 3.14t python benchmark.py

📋 Checklist

  • ✅ Implementation complete
  • ✅ All tests passing (173/173)
  • ✅ Documentation updated
  • ✅ Benchmarks run on both Python versions
  • ✅ README updated with performance section
  • ✅ Zero breaking changes
  • ✅ Backward compatible
  • ✅ Code reviewed by Oracle AI
  • ✅ Production ready

🎉 Conclusion

This PR makes json2xml ready for Python's free-threaded future while maintaining perfect compatibility with existing code. Users can now opt-in to parallel processing and see significant performance improvements on Python 3.14t!


Related Issues: N/A (proactive optimization)
Type: Feature Enhancement
Impact: Performance improvement, no breaking changes

Summary by Sourcery

Enable optional parallel JSON-to-XML conversion in json2xml leveraging Python 3.14t free-threaded mode, add thread-safe validation caching, provide benchmarks and documentation, and maintain full backward compatibility

New Features:

  • Add opt-in parallel processing for JSON-to-XML conversion via parallel, workers, and chunk_size parameters
  • Support free-threaded Python 3.14t builds with automatic detection and GIL-free execution
  • Provide thread-safe caching for XML validation in parallel mode

Enhancements:

  • Introduce json2xml/parallel.py module with concurrent dict and list conversion logic
  • Integrate parallel conversion paths into dicttoxml and Json2xml while preserving default serial behavior
  • Bundle a benchmarking script to measure serial vs. parallel performance

Documentation:

  • Update README and Sphinx docs with performance section and usage examples
  • Add detailed markdown files for optimization analysis, benchmark results, and implementation summary

Tests:

  • Add 20 new tests in tests/test_parallel.py covering detection, dict/list parallel conversion, nested data, and order preservation

- Add parallel processing module (json2xml/parallel.py) for concurrent XML conversion
- Implement parallel dict and list processing with thread-safe caching
- Add support for Python 3.14t free-threaded build (no-GIL)
- Achieve up to 1.55x speedup for medium datasets (100-1K items) on Python 3.14t

New Features:
- parallel parameter to enable/disable parallel processing (default: False)
- workers parameter to configure thread count (default: auto-detect)
- chunk_size parameter for list chunking (default: 100)
- Automatic free-threaded Python detection
- Smart fallback to serial processing for small datasets

Testing:
- Add 20 comprehensive parallel processing tests
- All 173 tests passing (153 original + 20 new)
- Zero regressions, full backward compatibility

Benchmarking:
- Add benchmark.py script for performance testing
- Benchmark results on Python 3.14 (GIL) and 3.14t (free-threaded)
- Medium datasets show 1.55x speedup on Python 3.14t

Documentation:
- Add FREE_THREADED_OPTIMIZATION_ANALYSIS.md with detailed analysis
- Add BENCHMARK_RESULTS.md with complete benchmark data
- Add docs/performance.rst for Sphinx documentation
- Update README.rst with performance section and usage examples
- Add implementation summaries and guides

Benchmark Results (Python 3.14t vs 3.14):
- Small (10 items): Serial processing (automatic fallback)
- Medium (100 items): 5.55ms vs 8.59ms serial (1.55x speedup)
- Large (1K items): Comparable performance
- XLarge (5K items): Comparable performance

Breaking Changes: None
Backward Compatibility: Full (parallel=False by default)

Amp-Thread-ID: https://ampcode.com/threads/T-9be8ca5d-f9ef-49cb-9913-b82d0f45dac2
Co-authored-by: Amp <[email protected]>
@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Oct 23, 2025

Reviewer's Guide

This PR introduces an opt-in parallel processing layer for json2xml using Python 3.14t’s free-threaded mode. It adds a dedicated parallel module, extends core conversion functions to dispatch between serial and threaded implementations based on configuration, exposes new API parameters, and provides a full suite of tests, benchmarks, and updated documentation—all while preserving backward compatibility.

Sequence diagram for parallel dict conversion in dicttoxml

sequenceDiagram
    participant Caller
    participant "dicttoxml.dicttoxml()"
    participant "parallel.convert_dict_parallel()"
    participant "ThreadPoolExecutor"
    participant "_convert_dict_item()"
    Caller->>"dicttoxml.dicttoxml()": call with parallel=True, obj is dict
    "dicttoxml.dicttoxml()"->>"parallel.convert_dict_parallel()": dispatch for dict
    "parallel.convert_dict_parallel()"->>"ThreadPoolExecutor": submit _convert_dict_item for each key
    "ThreadPoolExecutor"->>"_convert_dict_item()": process key/value in thread
    "_convert_dict_item()"-->>"ThreadPoolExecutor": return XML string
    "ThreadPoolExecutor"-->>"parallel.convert_dict_parallel()": collect results
    "parallel.convert_dict_parallel()"-->>"dicttoxml.dicttoxml()": return joined XML
    "dicttoxml.dicttoxml()"-->>Caller: return XML bytes
Loading

Sequence diagram for parallel list conversion in dicttoxml

sequenceDiagram
    participant Caller
    participant "dicttoxml.dicttoxml()"
    participant "parallel.convert_list_parallel()"
    participant "ThreadPoolExecutor"
    participant "_convert_list_chunk()"
    Caller->>"dicttoxml.dicttoxml()": call with parallel=True, obj is list
    "dicttoxml.dicttoxml()"->>"parallel.convert_list_parallel()": dispatch for list
    "parallel.convert_list_parallel()"->>"ThreadPoolExecutor": submit _convert_list_chunk for each chunk
    "ThreadPoolExecutor"->>"_convert_list_chunk()": process chunk in thread
    "_convert_list_chunk()"-->>"ThreadPoolExecutor": return XML string
    "ThreadPoolExecutor"-->>"parallel.convert_list_parallel()": collect results
    "parallel.convert_list_parallel()"-->>"dicttoxml.dicttoxml()": return joined XML
    "dicttoxml.dicttoxml()"-->>Caller: return XML bytes
Loading

Class diagram for updated Json2xml and dicttoxml API

classDiagram
    class Json2xml {
        +data: dict[str, Any] | None
        +pretty: bool
        +attr_type: bool
        +item_wrap: bool
        +root: str | None
        +parallel: bool
        +workers: int | None
        +chunk_size: int
        +to_xml() Any | None
    }
    class dicttoxml {
        +dicttoxml(
            obj: Any,
            ids: list[str] = [],
            custom_root: str = "root",
            attr_type: bool = True,
            item_func: Callable[[str], str] = default_item_func,
            cdata: bool = False,
            xml_namespaces: dict[str, Any],
            list_headers: bool = False,
            parallel: bool = False,
            workers: int | None = None,
            chunk_size: int = 100
        ) -> bytes
    }
    Json2xml --> dicttoxml : uses
Loading

Class diagram for new parallel processing module

classDiagram
    class parallel {
        +is_free_threaded() bool
        +get_optimal_workers(workers: int | None) int
        +key_is_valid_xml_cached(key: str) bool
        +make_valid_xml_name_cached(key: str, attr: dict[str, Any]) tuple[str, dict[str, Any]]
        +convert_dict_parallel(
            obj: dict[str, Any],
            ids: list[str],
            parent: str,
            attr_type: bool,
            item_func: Callable[[str], str],
            cdata: bool,
            item_wrap: bool,
            list_headers: bool = False,
            workers: int | None = None,
            min_items_for_parallel: int = 10
        ) str
        +convert_list_parallel(
            items: Sequence[Any],
            ids: list[str] | None,
            parent: str,
            attr_type: bool,
            item_func: Callable[[str], str],
            cdata: bool,
            item_wrap: bool,
            list_headers: bool = False,
            workers: int | None = None,
            chunk_size: int = 100
        ) str
    }
Loading

File-Level Changes

Change Details Files
Introduce parallel processing infrastructure
  • Add json2xml/parallel.py with free-threaded detection and thread pool utilities
  • Implement parallel convert_dict and convert_list functions with order preservation
  • Add thread-safe XML validation caching and name sanitization helpers
json2xml/parallel.py
Extend dicttoxml to route to parallel converters
  • Add parallel, workers, and chunk_size parameters to dicttoxml signature
  • Branch logic to invoke convert_dict_parallel or convert_list_parallel when parallel=True
  • Maintain original serial conversion path for fallback
json2xml/dicttoxml.py
Expose parallel options in Json2xml API
  • Add parallel, workers, and chunk_size parameters to Json2xml.init
  • Pass new parameters through to dicttoxml invocation in to_xml
  • Ensure default serial behavior remains unchanged
json2xml/json2xml.py
Add comprehensive parallel tests
  • Create tests/test_parallel.py with 20 tests covering feature detection, small/large data, nested structures, and API integration
  • Validate fallback to serial path, order preservation, and special character handling
tests/test_parallel.py
Provide benchmarking and documentation support
  • Add benchmark.py for performance measurement across dataset sizes and thread counts
  • Introduce FREE_THREADED_OPTIMIZATION_ANALYSIS.md and BENCHMARK_RESULTS.md
  • Update README.rst, docs/index.rst, and add docs/performance.rst with usage and benchmark details
benchmark.py
FREE_THREADED_OPTIMIZATION_ANALYSIS.md
BENCHMARK_RESULTS.md
README.rst
docs/index.rst
docs/performance.rst

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@codecov
Copy link

codecov bot commented Oct 23, 2025

Codecov Report

❌ Patch coverage is 99.33333% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 99.53%. Comparing base (e5ab104) to head (8e5d68a).
⚠️ Report is 1 commits behind head on master-freethreaded.

Files with missing lines Patch % Lines
json2xml/dicttoxml.py 97.43% 1 Missing ⚠️
Additional details and impacted files
@@                   Coverage Diff                   @@
##           master-freethreaded     #256      +/-   ##
=======================================================
+ Coverage                99.30%   99.53%   +0.23%     
=======================================================
  Files                        3        4       +1     
  Lines                      288      432     +144     
=======================================================
+ Hits                       286      430     +144     
  Misses                       2        2              
Flag Coverage Δ
unittests 99.53% <99.33%> (+0.23%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • Consider exposing the min_items_for_parallel threshold as a constructor parameter (similar to chunk_size) rather than hard-coding it to 10 inside convert_dict_parallel, so users can tune when dict parallelization kicks in.
  • Avoid dynamic imports within the hot path of dicttoxml—move from json2xml.parallel import … to module level to eliminate repetitive import overhead during conversion.
  • Benchmarks show limited gains on large datasets due to string concatenation and pretty-printing overhead; consider profiling those hotspots and using more efficient string builders (e.g. io.StringIO) or streaming output to improve throughput beyond the medium-size sweet spot.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- Consider exposing the `min_items_for_parallel` threshold as a constructor parameter (similar to `chunk_size`) rather than hard-coding it to 10 inside `convert_dict_parallel`, so users can tune when dict parallelization kicks in.
- Avoid dynamic imports within the hot path of `dicttoxml`—move `from json2xml.parallel import …` to module level to eliminate repetitive import overhead during conversion.
- Benchmarks show limited gains on large datasets due to string concatenation and pretty-printing overhead; consider profiling those hotspots and using more efficient string builders (e.g. `io.StringIO`) or streaming output to improve throughput beyond the medium-size sweet spot.

## Individual Comments

### Comment 1
<location> `json2xml/dicttoxml.py:726-731` </location>
<code_context>
+
+        if root:
+            output.append('<?xml version="1.0" encoding="UTF-8" ?>')
+            if isinstance(obj, dict):
+                output_elem = convert_dict_parallel(
+                    obj, ids, custom_root, attr_type, item_func, cdata, item_wrap,
+                    list_headers=list_headers, workers=workers, min_items_for_parallel=10
+                )
+            elif isinstance(obj, Sequence):
+                output_elem = convert_list_parallel(
+                    obj, ids, custom_root, attr_type, item_func, cdata, item_wrap,
</code_context>

<issue_to_address>
**issue (bug_risk):** Using Sequence to check for list-like objects may include strings and bytes.

Explicitly exclude str and bytes when checking for Sequence to prevent unintended handling of these types.
</issue_to_address>

### Comment 2
<location> `json2xml/parallel.py:14-21` </location>
<code_context>
+```python
+import sys
+
+def is_free_threaded() -> bool:
+    """Check if running on free-threaded Python build."""
+    return hasattr(sys, '_is_gil_enabled') and not sys._is_gil_enabled()
</code_context>

<issue_to_address>
**suggestion:** Reliance on sys._is_gil_enabled() may break in future Python versions.

Document the use of this private attribute and consider adding a fallback to handle potential changes in future Python versions.

```suggestion
def is_free_threaded() -> bool:
    """
    Check if running on free-threaded Python build (Python 3.13t).

    Note:
        This function relies on the private attribute `sys._is_gil_enabled`, which may change or be removed in future Python versions.
        If the attribute is not present, or its semantics change, this function will fall back to assuming GIL is enabled.

    Returns:
        bool: True if running on free-threaded build, False otherwise.
    """
    # Fallback: If attribute is missing or not callable, assume GIL is enabled.
    gil_enabled = True
    if hasattr(sys, '_is_gil_enabled'):
        try:
            gil_enabled = sys._is_gil_enabled()
        except Exception:
            pass
    return not gil_enabled
```
</issue_to_address>

### Comment 3
<location> `json2xml/parallel.py:24` </location>
<code_context>
+    return hasattr(sys, '_is_gil_enabled') and not sys._is_gil_enabled()
+
+
+def get_optimal_workers(workers: int | None = None) -> int:
+    """
+    Get the optimal number of worker threads.
</code_context>

<issue_to_address>
**suggestion (performance):** Limiting workers to min(4, cpu_count) for non-free-threaded Python may be too restrictive.

Making the worker limit configurable or informed by benchmarks could better utilize system resources on machines with many cores.

Suggested implementation:

```python
def get_optimal_workers(
    workers: int | None = None,
    max_workers_limit: int | None = None
) -> int:
    """
    Get the optimal number of worker threads.

    Args:
        workers: Explicitly specified worker count. If None, auto-detect.
        max_workers_limit: Optional cap for worker count on non-free-threaded Python.

    Returns:
        int: Number of worker threads to use.
    """
    if workers is not None:
        return max(1, workers)

```

```python
    if workers is not None:
        return max(1, workers)

    import os

    cpu_count = os.cpu_count() or 1

    if is_free_threaded():
        optimal = cpu_count
    else:
        # Use configurable limit or default to 4
        limit = max_workers_limit if max_workers_limit is not None else 4
        optimal = min(limit, cpu_count)

    return max(1, optimal)

```
</issue_to_address>

### Comment 4
<location> `json2xml/parallel.py:217-226` </location>
<code_context>
+       if len(obj) < 10:
+           return convert_dict(obj, ids, parent, attr_type, item_func, cdata, item_wrap, list_headers)
+       
+       with ThreadPoolExecutor(max_workers=workers) as executor:
+           futures = []
+           for key, val in obj.items():
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Using as_completed may result in out-of-order results.

If a future raises an exception, the current error handling may not be sufficient. Please add error handling for failed futures.
</issue_to_address>

### Comment 5
<location> `tests/test_parallel.py:60-58` </location>
<code_context>
+        )
+        assert result_parallel == result_serial
+
+    def test_parallel_dict_large(self) -> None:
+        """Test parallel dict conversion with large data."""
+        data = {f"key{i}": f"value{i}" for i in range(20)}
+        result_parallel = convert_dict_parallel(
+            data, [], "root", True, dicttoxml.default_item_func, False, True, False, workers=4
+        )
+        result_serial = dicttoxml.convert_dict(
+            data, [], "root", True, dicttoxml.default_item_func, False, True, False
+        )
+        assert result_parallel == result_serial
+
+    def test_parallel_list_small(self) -> None:
</code_context>

<issue_to_address>
**suggestion (testing):** Consider adding tests for error handling in parallel conversion functions.

Please add tests that check how parallel conversion functions handle invalid inputs and ensure appropriate exceptions are raised.

Suggested implementation:

```python
    def test_parallel_dict_invalid_input(self) -> None:
        """Test parallel dict conversion with invalid input types."""
        # Passing a list instead of a dict
        invalid_data = ["not", "a", "dict"]
        with pytest.raises(TypeError):
            convert_dict_parallel(
                invalid_data, [], "root", True, dicttoxml.default_item_func, False, True, False, workers=2
            )

        # Passing None
        with pytest.raises(TypeError):
            convert_dict_parallel(
                None, [], "root", True, dicttoxml.default_item_func, False, True, False, workers=2
            )

    def test_parallel_list_invalid_input(self) -> None:
        """Test parallel list conversion with invalid input types."""
        # Passing a dict instead of a list
        invalid_data = {"not": "a list"}
        with pytest.raises(TypeError):
            convert_list_parallel(
                invalid_data, [], "root", True, dicttoxml.default_item_func, False, True, False, workers=2, chunk_size=100
            )

        # Passing None
        with pytest.raises(TypeError):
            convert_list_parallel(
                None, [], "root", True, dicttoxml.default_item_func, False, True, False, workers=2, chunk_size=100
            )

    def test_parallel_list_small(self) -> None:
        """Test parallel list conversion with small data (should fallback to serial)."""
        data = ["item1", "item2", "item3"]
        result_parallel = convert_list_parallel(
            data, [], "root", True, dicttoxml.default_item_func, False, True, False, workers=2, chunk_size=100
        )
        result_serial = dicttoxml.convert_list(
            data, [], "root", True, dicttoxml.default_item_func, False, True, False
        )
        assert result_parallel == result_serial


```

These tests assume that `convert_dict_parallel` and `convert_list_parallel` raise `TypeError` for invalid input types. If they raise a different exception (e.g., `ValueError`), adjust the `pytest.raises` argument accordingly.

You must also ensure that `pytest` is imported at the top of the file:
```python
import pytest
```
If not already present, add this import.
</issue_to_address>

### Comment 6
<location> `tests/test_parallel.py:202-211` </location>
<code_context>
+
+        assert result_parallel == result_serial
+
+    def test_parallel_with_special_characters(self) -> None:
+        """Test parallel processing with special XML characters."""
+        data = {
+            f"key{i}": f"value with <special> & \"characters\" {i}"
+            for i in range(15)
+        }
+
+        result_parallel = dicttoxml.dicttoxml(data, parallel=True, workers=4)
+        result_serial = dicttoxml.dicttoxml(data, parallel=False)
+
+        assert result_parallel == result_serial
+        assert b"&lt;special&gt;" in result_parallel
+        assert b"&amp;" in result_parallel
+
+    def test_parallel_empty_data(self) -> None:
</code_context>

<issue_to_address>
**suggestion (testing):** Consider adding tests for other XML edge cases, such as attributes, namespaces, and CDATA sections.

Adding tests for attributes, namespaces, and CDATA will help verify that parallel processing handles these XML features correctly and avoids subtle bugs.

Suggested implementation:

```python
    def test_parallel_with_attributes(self) -> None:
        """Test parallel processing with XML attributes."""
        # Simulate attribute handling using dicttoxml's attr_type feature
        data = {
            "person": {
                "@id": "123",
                "name": "Alice"
            }
        }
        result_parallel = dicttoxml.dicttoxml(data, parallel=True, workers=2, attr_type=True)
        result_serial = dicttoxml.dicttoxml(data, parallel=False, attr_type=True)
        assert result_parallel == result_serial
        assert b'id="123"' in result_parallel

    def test_parallel_with_namespaces(self) -> None:
        """Test parallel processing with XML namespaces."""
        # Simulate namespace handling by including a namespace in the tag
        data = {
            "ns:person": {
                "name": "Bob"
            }
        }
        result_parallel = dicttoxml.dicttoxml(data, parallel=True, workers=2)
        result_serial = dicttoxml.dicttoxml(data, parallel=False)
        assert result_parallel == result_serial
        assert b"<ns:person>" in result_parallel

    def test_parallel_with_cdata(self) -> None:
        """Test parallel processing with CDATA sections."""
        # Simulate CDATA by including a value that should be wrapped in CDATA
        data = {
            "note": "<![CDATA[Some <cdata> content & more]]>"
        }
        result_parallel = dicttoxml.dicttoxml(data, parallel=True, workers=2)
        result_serial = dicttoxml.dicttoxml(data, parallel=False)
        assert result_parallel == result_serial
        assert b"<![CDATA[Some <cdata> content & more]]>" in result_parallel

    def test_parallel_empty_data(self) -> None:
        """Test parallel processing with empty data."""
        data = {"key": "value"}
        converter = Json2xml(data, parallel=True, workers=4)
        result = converter.to_xml()
        assert result is not None

```

- If your dicttoxml library or wrapper does not support attributes, namespaces, or CDATA natively, you may need to adjust the test data or parameters to match your implementation.
- Ensure that the dicttoxml.dicttoxml function supports the `attr_type` argument for attributes. If not, you may need to use a different approach or library for these tests.
- If your XML generator does not support CDATA sections, you may need to extend its functionality or mock the expected output.
</issue_to_address>

### Comment 7
<location> `tests/test_parallel.py:216-221` </location>
<code_context>
+        assert b"&lt;special&gt;" in result_parallel
+        assert b"&amp;" in result_parallel
+
+    def test_parallel_empty_data(self) -> None:
+        """Test parallel processing with empty data."""
+        data = {"key": "value"}
+        converter = Json2xml(data, parallel=True, workers=4)
+        result = converter.to_xml()
+        assert result is not None
+
+    def test_parallel_with_none_workers(self) -> None:
</code_context>

<issue_to_address>
**nitpick (testing):** Consider adding assertions for the actual output of empty data conversion.

Asserting the exact output for empty data will make the test more reliable and help detect unintended changes in the output format.
</issue_to_address>

### Comment 8
<location> `tests/test_parallel.py:223-221` </location>
<code_context>
+        result = converter.to_xml()
+        assert result is not None
+
+    def test_parallel_with_none_workers(self) -> None:
+        """Test parallel processing with None workers (auto-detect)."""
+        data = {f"key{i}": f"value{i}" for i in range(20)}
+        converter = Json2xml(data, parallel=True, workers=None)
+        result = converter.to_xml()
+        assert result is not None
+
+    def test_parallel_dict_order_preserved(self) -> None:
</code_context>

<issue_to_address>
**suggestion (testing):** Consider adding a test for chunk_size=None or invalid chunk_size values.

Testing chunk_size=None or invalid values (e.g., negative, zero) will verify that the code handles these scenarios correctly and applies appropriate defaults.
</issue_to_address>

### Comment 9
<location> `json2xml/dicttoxml.py:721` </location>
<code_context>
-        )
-        output.append(f"<{custom_root}{namespace_str}>{output_elem}</{custom_root}>")
+
+    if parallel:
+        from json2xml.parallel import convert_dict_parallel, convert_list_parallel
+
</code_context>

<issue_to_address>
**issue (complexity):** Consider refactoring the conversion logic into a single helper function to centralize dispatch and eliminate nested conditionals.

```python
# add this helper at module‐level to unify dispatching
def _dispatch_convert(
    obj, ids, parent,
    attr_type, item_func, cdata, item_wrap, list_headers,
    parallel, workers, chunk_size
):
    if parallel:
        if isinstance(obj, dict):
            return convert_dict_parallel(
                obj, ids, parent,
                attr_type=attr_type, item_func=item_func, cdata=cdata,
                item_wrap=item_wrap, list_headers=list_headers,
                workers=workers, min_items_for_parallel=10
            )
        if isinstance(obj, Sequence):
            return convert_list_parallel(
                obj, ids, parent,
                attr_type=attr_type, item_func=item_func, cdata=cdata,
                item_wrap=item_wrap, list_headers=list_headers,
                workers=workers, chunk_size=chunk_size
            )
    # fallback to serial
    return convert(
        obj, ids,
        attr_type, item_func, cdata, item_wrap,
        parent=parent, list_headers=list_headers
    )


def dicttoxml(
    obj: ELEMENT,
    root: bool = True,
    custom_root: str = "root",
    ids: list[int] | None = None,
    attr_type: bool = True,
    item_wrap: bool = True,
    item_func: Callable[[str], str] = default_item_func,
    cdata: bool = False,
    xml_namespaces: dict[str, Any] = {},
    list_headers: bool = False,
    parallel: bool = False,
    workers: int | None = None,
    chunk_size: int = 100
) -> bytes:
    # ... build namespace_str ...

    # choose parent name
    parent = custom_root if root else ""
    # single dispatch point
    output_elem = _dispatch_convert(
        obj, ids, parent,
        attr_type, item_func, cdata, item_wrap, list_headers,
        parallel, workers, chunk_size
    )

    if root:
        output = ['<?xml version="1.0" encoding="UTF-8" ?>']
        output.append(f"<{custom_root}{namespace_str}>{output_elem}</{custom_root}>")
    else:
        output = [output_elem]

    return "".join(output).encode("utf-8")
```

This removes the duplicated `if root`/`elif parallel` nesting and centralizes all conversion logic into one flat helper.
</issue_to_address>

### Comment 10
<location> `json2xml/parallel.py:49` </location>
<code_context>
+_validation_cache_lock = threading.Lock()
+_validation_cache: dict[str, bool] = {}
+
+def key_is_valid_xml_cached(key: str) -> bool:
+    """Thread-safe cached version of key_is_valid_xml."""
+    with _validation_cache_lock:
</code_context>

<issue_to_address>
**issue (complexity):** Consider refactoring the parallel functions to use a generic executor wrapper and functools.lru_cache for caching to reduce duplication and thread-safety code.

```suggestion
Consider collapsing the two “parallel” functions into one generic executor wrapper and swapping out the manual cache/lock for `functools.lru_cache`. This removes almost all duplication and thread‐safety boilerplate while preserving behavior.

1) Replace manual cache + lock with `lru_cache`:

```python
from functools import lru_cache

@lru_cache(maxsize=None)
def key_is_valid_xml(key: str) -> bool:
    return dicttoxml.key_is_valid_xml(key)
```

2) Add a single helper to parallelize any function over a list of args:

```python
from concurrent.futures import ThreadPoolExecutor, as_completed

def _parallel_map(
    fn: Callable, 
    args_list: list[tuple], 
    workers: int | None = None, 
    min_items: int = 0
) -> list[Any]:
    if len(args_list) <= min_items:
        return [fn(*args) for args in args_list]
    workers = get_optimal_workers(workers)
    results = [None] * len(args_list)
    with ThreadPoolExecutor(max_workers=workers) as executor:
        future_to_idx = {
            executor.submit(fn, *args): idx
            for idx, args in enumerate(args_list)
        }
        for fut in as_completed(future_to_idx):
            results[future_to_idx[fut]] = fut.result()
    return results
```

3) Refactor your two APIs to use `_parallel_map`. Example for dict:

```python
def convert_dict_parallel(...):
    items = list(obj.items())
    args = [
      (key, val, ids, parent, attr_type, item_func, cdata, item_wrap, list_headers)
      for key, val in items
    ]
    parts = _parallel_map(_convert_dict_item, args, workers, min_items_for_parallel)
    return "".join(parts)
```

And similarly for lists. This removes ~200 lines of duplicated pool setup, cache, and ordering logic.```
</issue_to_address>

### Comment 11
<location> `json2xml/parallel.py:132` </location>
<code_context>
    attr = {} if not ids else {"id": f"{dicttoxml.get_unique_id(parent)}"}

</code_context>

<issue_to_address>
**suggestion (code-quality):** Swap if/else branches of if expression to remove negation ([`swap-if-expression`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/swap-if-expression))

```suggestion
    attr = {"id": f"{dicttoxml.get_unique_id(parent)}"} if ids else {}
```

<br/><details><summary>Explanation</summary>Negated conditions are more difficult to read than positive ones, so it is best
to avoid them where we can. By swapping the `if` and `else` conditions around we
can invert the condition and make it positive.
</details>
</issue_to_address>

### Comment 12
<location> `benchmark.py:61` </location>
<code_context>
def run_benchmarks():
    """Run comprehensive benchmarks."""
    print("=" * 80)
    print("json2xml Performance Benchmark")
    print("=" * 80)
    print(f"Python Version: {sys.version}")
    print(f"Python Executable: {sys.executable}")
    print(f"Free-threaded: {'Yes' if is_free_threaded() else 'No'}")
    gil_status = "Disabled (Free-threaded)" if is_free_threaded() else "Enabled (Standard GIL)"
    print(f"GIL Status: {gil_status}")
    print("=" * 80)
    print()

    sizes = ["small", "medium", "large", "xlarge"]

    for size in sizes:
        print(f"\n{size.upper()} Dataset:")
        print("-" * 80)

        data = generate_test_data(size)

        # Count items
        if "users" in data:
            item_count = len(data["users"])
        elif "data" in data:
            item_count = len(data["data"])
        else:
            item_count = len(data)
        print(f"Items: {item_count}")

        # Serial benchmark
        serial_time = benchmark_conversion(data, parallel=False)
        print(f"Serial:     {serial_time*1000:.2f} ms")

        # Parallel benchmarks with different worker counts
        for workers in [2, 4, 8]:
            parallel_time = benchmark_conversion(data, parallel=True, workers=workers, chunk_size=100)
            speedup = serial_time / parallel_time
            print(f"Parallel ({workers}w): {parallel_time*1000:.2f} ms (speedup: {speedup:.2f}x)")

        print()

    print("=" * 80)
    print("Benchmark complete!")
    print("=" * 80)

</code_context>

<issue_to_address>
**issue (code-quality):** Extract duplicate code into function ([`extract-duplicate-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-duplicate-method/))
</issue_to_address>

### Comment 13
<location> `json2xml/parallel.py:39-42` </location>
<code_context>
def get_optimal_workers(workers: int | None = None) -> int:
    """
    Get the optimal number of worker threads.

    Args:
        workers: Explicitly specified worker count. If None, auto-detect.

    Returns:
        int: Number of worker threads to use.
    """
    if workers is not None:
        return max(1, workers)

    cpu_count = os.cpu_count() or 4

    if is_free_threaded():
        return cpu_count
    else:
        return min(4, cpu_count)

</code_context>

<issue_to_address>
**suggestion (code-quality):** Replace if statement with if expression ([`assign-if-exp`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/assign-if-exp/))

```suggestion
    return cpu_count if is_free_threaded() else min(4, cpu_count)
```
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +202 to +211
def test_parallel_with_special_characters(self) -> None:
"""Test parallel processing with special XML characters."""
data = {
f"key{i}": f"value with <special> & \"characters\" {i}"
for i in range(15)
}

result_parallel = dicttoxml.dicttoxml(data, parallel=True, workers=4)
result_serial = dicttoxml.dicttoxml(data, parallel=False)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Consider adding tests for other XML edge cases, such as attributes, namespaces, and CDATA sections.

Adding tests for attributes, namespaces, and CDATA will help verify that parallel processing handles these XML features correctly and avoids subtle bugs.

Suggested implementation:

    def test_parallel_with_attributes(self) -> None:
        """Test parallel processing with XML attributes."""
        # Simulate attribute handling using dicttoxml's attr_type feature
        data = {
            "person": {
                "@id": "123",
                "name": "Alice"
            }
        }
        result_parallel = dicttoxml.dicttoxml(data, parallel=True, workers=2, attr_type=True)
        result_serial = dicttoxml.dicttoxml(data, parallel=False, attr_type=True)
        assert result_parallel == result_serial
        assert b'id="123"' in result_parallel

    def test_parallel_with_namespaces(self) -> None:
        """Test parallel processing with XML namespaces."""
        # Simulate namespace handling by including a namespace in the tag
        data = {
            "ns:person": {
                "name": "Bob"
            }
        }
        result_parallel = dicttoxml.dicttoxml(data, parallel=True, workers=2)
        result_serial = dicttoxml.dicttoxml(data, parallel=False)
        assert result_parallel == result_serial
        assert b"<ns:person>" in result_parallel

    def test_parallel_with_cdata(self) -> None:
        """Test parallel processing with CDATA sections."""
        # Simulate CDATA by including a value that should be wrapped in CDATA
        data = {
            "note": "<![CDATA[Some <cdata> content & more]]>"
        }
        result_parallel = dicttoxml.dicttoxml(data, parallel=True, workers=2)
        result_serial = dicttoxml.dicttoxml(data, parallel=False)
        assert result_parallel == result_serial
        assert b"<![CDATA[Some <cdata> content & more]]>" in result_parallel

    def test_parallel_empty_data(self) -> None:
        """Test parallel processing with empty data."""
        data = {"key": "value"}
        converter = Json2xml(data, parallel=True, workers=4)
        result = converter.to_xml()
        assert result is not None
  • If your dicttoxml library or wrapper does not support attributes, namespaces, or CDATA natively, you may need to adjust the test data or parameters to match your implementation.
  • Ensure that the dicttoxml.dicttoxml function supports the attr_type argument for attributes. If not, you may need to use a different approach or library for these tests.
  • If your XML generator does not support CDATA sections, you may need to extend its functionality or mock the expected output.

)
output.append(f"<{custom_root}{namespace_str}>{output_elem}</{custom_root}>")

if parallel:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider refactoring the conversion logic into a single helper function to centralize dispatch and eliminate nested conditionals.

# add this helper at module‐level to unify dispatching
def _dispatch_convert(
    obj, ids, parent,
    attr_type, item_func, cdata, item_wrap, list_headers,
    parallel, workers, chunk_size
):
    if parallel:
        if isinstance(obj, dict):
            return convert_dict_parallel(
                obj, ids, parent,
                attr_type=attr_type, item_func=item_func, cdata=cdata,
                item_wrap=item_wrap, list_headers=list_headers,
                workers=workers, min_items_for_parallel=10
            )
        if isinstance(obj, Sequence):
            return convert_list_parallel(
                obj, ids, parent,
                attr_type=attr_type, item_func=item_func, cdata=cdata,
                item_wrap=item_wrap, list_headers=list_headers,
                workers=workers, chunk_size=chunk_size
            )
    # fallback to serial
    return convert(
        obj, ids,
        attr_type, item_func, cdata, item_wrap,
        parent=parent, list_headers=list_headers
    )


def dicttoxml(
    obj: ELEMENT,
    root: bool = True,
    custom_root: str = "root",
    ids: list[int] | None = None,
    attr_type: bool = True,
    item_wrap: bool = True,
    item_func: Callable[[str], str] = default_item_func,
    cdata: bool = False,
    xml_namespaces: dict[str, Any] = {},
    list_headers: bool = False,
    parallel: bool = False,
    workers: int | None = None,
    chunk_size: int = 100
) -> bytes:
    # ... build namespace_str ...

    # choose parent name
    parent = custom_root if root else ""
    # single dispatch point
    output_elem = _dispatch_convert(
        obj, ids, parent,
        attr_type, item_func, cdata, item_wrap, list_headers,
        parallel, workers, chunk_size
    )

    if root:
        output = ['<?xml version="1.0" encoding="UTF-8" ?>']
        output.append(f"<{custom_root}{namespace_str}>{output_elem}</{custom_root}>")
    else:
        output = [output_elem]

    return "".join(output).encode("utf-8")

This removes the duplicated if root/elif parallel nesting and centralizes all conversion logic into one flat helper.

_validation_cache_lock = threading.Lock()


def key_is_valid_xml_cached(key: str) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider refactoring the parallel functions to use a generic executor wrapper and functools.lru_cache for caching to reduce duplication and thread-safety code.

Suggested change
def key_is_valid_xml_cached(key: str) -> bool:
Consider collapsing the twoparallelfunctions into one generic executor wrapper and swapping out the manual cache/lock for `functools.lru_cache`. This removes almost all duplication and threadsafety boilerplate while preserving behavior.
1) Replace manual cache + lock with `lru_cache`:
```python
from functools import lru_cache
@lru_cache(maxsize=None)
def key_is_valid_xml(key: str) -> bool:
return dicttoxml.key_is_valid_xml(key)
  1. Add a single helper to parallelize any function over a list of args:
from concurrent.futures import ThreadPoolExecutor, as_completed

def _parallel_map(
    fn: Callable, 
    args_list: list[tuple], 
    workers: int | None = None, 
    min_items: int = 0
) -> list[Any]:
    if len(args_list) <= min_items:
        return [fn(*args) for args in args_list]
    workers = get_optimal_workers(workers)
    results = [None] * len(args_list)
    with ThreadPoolExecutor(max_workers=workers) as executor:
        future_to_idx = {
            executor.submit(fn, *args): idx
            for idx, args in enumerate(args_list)
        }
        for fut in as_completed(future_to_idx):
            results[future_to_idx[fut]] = fut.result()
    return results
  1. Refactor your two APIs to use _parallel_map. Example for dict:
def convert_dict_parallel(...):
    items = list(obj.items())
    args = [
      (key, val, ids, parent, attr_type, item_func, cdata, item_wrap, list_headers)
      for key, val in items
    ]
    parts = _parallel_map(_convert_dict_item, args, workers, min_items_for_parallel)
    return "".join(parts)

And similarly for lists. This removes ~200 lines of duplicated pool setup, cache, and ordering logic.```


def run_benchmarks():
"""Run comprehensive benchmarks."""
print("=" * 80)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): Extract duplicate code into function (extract-duplicate-method)

output.append(f"<{custom_root}{namespace_str}>{output_elem}</{custom_root}>")

if parallel:
from json2xml.parallel import convert_dict_parallel, convert_list_parallel

Check notice

Code scanning / CodeQL

Cyclic import Note

Import of module
json2xml.parallel
begins an import cycle.

Copilot Autofix

AI 4 days ago

The best way to fix the cyclic import is to refactor code such that the common logic that's required by both dicttoxml.py and parallel.py is moved to a new submodule or utility file (e.g., json2xml/utils.py or json2xml/shared.py). This common code can then be imported by both modules without creating a cycle. Specifically, move all functions that are imported in both directions (i.e., those in parallel.py that import from dicttoxml.py, and vice versa) into the new utility module, and change the imports in both files appropriately.

From the code provided, it looks like only the specific functions convert_dict_parallel and convert_list_parallel are imported from json2xml.parallel in this location. If, on inspection, json2xml.parallel imports any functions from dicttoxml.py (e.g., core conversion logic), those should be moved out to a third module and both files updated.

Since we are only allowed to edit code that is shown, and assuming that the import on line 749 is the problem, the immediate step is to "invert the dependency": Move the conversion helpers used for parallel execution from parallel.py into a new file (say shared.py), or within dicttoxml.py if they only use logic from there, and update references. In this context, assuming the original import can be changed to avoid the dependency, we can move the parallel conversion helpers to a new shared module (e.g., json2xml/parallel_helpers.py), import from there, and restrict the use of parallel.py as needed.

Within the snippet, only edit the import on line 749 and related usage; include comments for clarity. No change to core behavior should be made.


Suggested changeset 1
json2xml/dicttoxml.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/json2xml/dicttoxml.py b/json2xml/dicttoxml.py
--- a/json2xml/dicttoxml.py
+++ b/json2xml/dicttoxml.py
@@ -746,7 +746,7 @@
             should_use_parallel = False
 
     if should_use_parallel:
-        from json2xml.parallel import convert_dict_parallel, convert_list_parallel
+        from json2xml.parallel_helpers import convert_dict_parallel, convert_list_parallel  # Moved to break cyclical import
 
         if root:
             output.append('<?xml version="1.0" encoding="UTF-8" ?>')
EOF
@@ -746,7 +746,7 @@
should_use_parallel = False

if should_use_parallel:
from json2xml.parallel import convert_dict_parallel, convert_list_parallel
from json2xml.parallel_helpers import convert_dict_parallel, convert_list_parallel # Moved to break cyclical import

if root:
output.append('<?xml version="1.0" encoding="UTF-8" ?>')
Copilot is powered by AI and may make mistakes. Always verify output.
vinitkumar and others added 6 commits October 24, 2025 02:42
- Change ids parameter type from list[int] to list[str] in dicttoxml function
- Update convert_dict and convert_dict_parallel to accept list[str] | None
- Fix test to use string list instead of int list for ids
- Add None check in test_parallel.py before type narrowing result_bytes

Amp-Thread-ID: https://ampcode.com/threads/T-ab40799c-7282-451b-bdf6-4a74c73a62b7
Co-authored-by: Amp <[email protected]>
…rage

- Add tests for parallel processing without root element (dict, list, primitive)
- Add tests for make_valid_xml_name_cached function with various edge cases
- Add tests for parallel processing with sequences, None values, and error cases
- Add tests for boolean and datetime values in parallel mode
- Coverage improved from 94% to 99%

Amp-Thread-ID: https://ampcode.com/threads/T-ab40799c-7282-451b-bdf6-4a74c73a62b7
Co-authored-by: Amp <[email protected]>
…diff coverage

- Add tests for get_xml_type with Decimal and Fraction (numbers.Number subclasses)
- Add test for parallel processing with root and primitive values
- Add test for get_optimal_workers in non-free-threaded mode
- Coverage improved to 99% (393 total statements, only 2 uncovered)
- All ruff and ty checks pass

Amp-Thread-ID: https://ampcode.com/threads/T-ab40799c-7282-451b-bdf6-4a74c73a62b7
Co-authored-by: Amp <[email protected]>

import os
import sys
import threading

Check notice

Code scanning / CodeQL

Unused import Note

Import of 'threading' is not used.

Copilot Autofix

AI 4 days ago

The correct fix is to remove the unused import statement for threading from the code (import threading on line 6 of json2xml/parallel.py). This resolves the warning without impacting any existing functionality since threading is not used anywhere in the snippets shown. No additional imports or changes are required.

Suggested changeset 1
json2xml/parallel.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/json2xml/parallel.py b/json2xml/parallel.py
--- a/json2xml/parallel.py
+++ b/json2xml/parallel.py
@@ -3,7 +3,6 @@
 
 import os
 import sys
-import threading
 from collections.abc import Callable, Sequence
 from concurrent.futures import ThreadPoolExecutor, as_completed
 from functools import lru_cache
EOF
@@ -3,7 +3,6 @@

import os
import sys
import threading
from collections.abc import Callable, Sequence
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import lru_cache
Copilot is powered by AI and may make mistakes. Always verify output.
if hasattr(sys, '_is_gil_enabled'):
try:
gil_enabled = sys._is_gil_enabled()
except Exception:

Check notice

Code scanning / CodeQL

Empty except Note

'except' clause does nothing but pass and there is no explanatory comment.

Copilot Autofix

AI 4 days ago

To fix the problem, the except block should not be left empty. The best solutions are to (a) log the exception, so that failures are noticed and can be diagnosed, or (b) add an explanatory comment to clarify why ignoring the exception is correct and intentional. Prefer (a) for most production code unless there is a very strong rationale for silent suppression. In this case, logging the exception is best. The built-in logging module is standard and should be used. Changes required:

  • Add import logging to the imports if not present.
  • In the except block, replace pass with a logging call (e.g., logging.debug("Exception in sys._is_gil_enabled(): %s", e)).
  • Update the except line to except Exception as e:
    All changes are within the shown code.
Suggested changeset 1
json2xml/parallel.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/json2xml/parallel.py b/json2xml/parallel.py
--- a/json2xml/parallel.py
+++ b/json2xml/parallel.py
@@ -4,6 +4,7 @@
 import os
 import sys
 import threading
+import logging
 from collections.abc import Callable, Sequence
 from concurrent.futures import ThreadPoolExecutor, as_completed
 from functools import lru_cache
@@ -26,8 +27,8 @@
     if hasattr(sys, '_is_gil_enabled'):
         try:
             gil_enabled = sys._is_gil_enabled()
-        except Exception:
-            pass
+        except Exception as e:
+            logging.debug("Exception in sys._is_gil_enabled(): %s", e)
     return not gil_enabled
 
 
EOF
@@ -4,6 +4,7 @@
import os
import sys
import threading
import logging
from collections.abc import Callable, Sequence
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import lru_cache
@@ -26,8 +27,8 @@
if hasattr(sys, '_is_gil_enabled'):
try:
gil_enabled = sys._is_gil_enabled()
except Exception:
pass
except Exception as e:
logging.debug("Exception in sys._is_gil_enabled(): %s", e)
return not gil_enabled


Copilot is powered by AI and may make mistakes. Always verify output.
Returns:
bool: True if the key is valid XML, False otherwise.
"""
from json2xml import dicttoxml

Check notice

Code scanning / CodeQL

Cyclic import Note

Import of module
json2xml.dicttoxml
begins an import cycle.

Copilot Autofix

AI 4 days ago

To fix the cyclic import between json2xml.parallel and json2xml.dicttoxml, we should break the dependency loop. Since only certain functions require access to dicttoxml (key_is_valid_xml_cached and make_valid_xml_name_cached), and they are simply wrappers for functions in dicttoxml, the cleanest solution is to move these two functions (key_is_valid_xml_cached and make_valid_xml_name_cached) from parallel.py into dicttoxml.py. This eliminates the need for parallel.py to import dicttoxml at all, breaking the loop. After moving these function definitions (along with their imports and docstrings), remove them completely from parallel.py.

Furthermore, wherever these functions are used outside dicttoxml, update their import location, if needed (though you instructed us to only edit snippets we've been shown, so we won't update usages outside the given parallel.py code).

In this snippet, remove the following from parallel.py:

  • The entire key_is_valid_xml_cached function (lines 63–76)
  • The entire make_valid_xml_name_cached function (lines 78–107)

Note: The implementations can be safely moved into dicttoxml.py, but per the instructions, we only edit code shown, which is limited to parallel.py.


Suggested changeset 1
json2xml/parallel.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/json2xml/parallel.py b/json2xml/parallel.py
--- a/json2xml/parallel.py
+++ b/json2xml/parallel.py
@@ -60,52 +60,8 @@
     return max(1, optimal)
 
 
-@lru_cache(maxsize=None)
-def key_is_valid_xml_cached(key: str) -> bool:
-    """
-    Thread-safe cached version of key_is_valid_xml.
 
-    Args:
-        key: The XML key to validate.
 
-    Returns:
-        bool: True if the key is valid XML, False otherwise.
-    """
-    from json2xml import dicttoxml
-    return dicttoxml.key_is_valid_xml(key)
-
-
-def make_valid_xml_name_cached(key: str, attr: dict[str, Any]) -> tuple[str, dict[str, Any]]:
-    """
-    Thread-safe cached version of make_valid_xml_name.
-
-    Args:
-        key: The key to validate.
-        attr: The attributes dictionary.
-
-    Returns:
-        tuple: Valid XML key and updated attributes.
-    """
-    from json2xml import dicttoxml
-    key = dicttoxml.escape_xml(key)
-
-    if key_is_valid_xml_cached(key):
-        return key, attr
-
-    if isinstance(key, int) or key.isdigit():
-        return f"n{key}", attr
-
-    if key_is_valid_xml_cached(key.replace(" ", "_")):
-        return key.replace(" ", "_"), attr
-
-    if key_is_valid_xml_cached(key.replace(":", "").replace("@flat", "")):
-        return key, attr
-
-    attr["name"] = key
-    key = "key"
-    return key, attr
-
-
 def _convert_dict_item(
     key: str,
     val: Any,
EOF
@@ -60,52 +60,8 @@
return max(1, optimal)


@lru_cache(maxsize=None)
def key_is_valid_xml_cached(key: str) -> bool:
"""
Thread-safe cached version of key_is_valid_xml.

Args:
key: The XML key to validate.

Returns:
bool: True if the key is valid XML, False otherwise.
"""
from json2xml import dicttoxml
return dicttoxml.key_is_valid_xml(key)


def make_valid_xml_name_cached(key: str, attr: dict[str, Any]) -> tuple[str, dict[str, Any]]:
"""
Thread-safe cached version of make_valid_xml_name.

Args:
key: The key to validate.
attr: The attributes dictionary.

Returns:
tuple: Valid XML key and updated attributes.
"""
from json2xml import dicttoxml
key = dicttoxml.escape_xml(key)

if key_is_valid_xml_cached(key):
return key, attr

if isinstance(key, int) or key.isdigit():
return f"n{key}", attr

if key_is_valid_xml_cached(key.replace(" ", "_")):
return key.replace(" ", "_"), attr

if key_is_valid_xml_cached(key.replace(":", "").replace("@flat", "")):
return key, attr

attr["name"] = key
key = "key"
return key, attr


def _convert_dict_item(
key: str,
val: Any,
Copilot is powered by AI and may make mistakes. Always verify output.
Returns:
tuple: Valid XML key and updated attributes.
"""
from json2xml import dicttoxml

Check notice

Code scanning / CodeQL

Cyclic import Note

Import of module
json2xml.dicttoxml
begins an import cycle.

Copilot Autofix

AI 4 days ago

General Approach:
To fix a cyclic import, move interdependent functions between modules to make them unidirectional, or refactor shared code into a new module that both can import without cyclic references.

Specific fix (best for this code):
Both cached XML helper functions in parallel.py (key_is_valid_xml_cached and make_valid_xml_name_cached) depend on dicttoxml. If only these helpers depend on dicttoxml, it's best to move them (including their decorators) into dicttoxml.py. This breaks the cycle: parallel.py will not need to import dicttoxml, and any functions in dicttoxml can call the helpers locally.

Changes to make:

  • Remove the two helper functions (key_is_valid_xml_cached and make_valid_xml_name_cached) from parallel.py, replacing their calls with imports from dicttoxml.
  • In dicttoxml.py, add these two helpers so they are accessible to any code needing them.
  • In parallel.py, replace local usage of these helpers with imports from dicttoxml.

Needed for implementation:

  • Move function definitions (including docstrings and any decorators) to dicttoxml.py.
  • Add import lines for the helpers in parallel.py where needed.
  • Remove the function-local imports (from json2xml import dicttoxml) from within parallel.py.

Suggested changeset 1
json2xml/parallel.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/json2xml/parallel.py b/json2xml/parallel.py
--- a/json2xml/parallel.py
+++ b/json2xml/parallel.py
@@ -60,52 +60,12 @@
     return max(1, optimal)
 
 
-@lru_cache(maxsize=None)
-def key_is_valid_xml_cached(key: str) -> bool:
-    """
-    Thread-safe cached version of key_is_valid_xml.
+# (Moved to dicttoxml.py. Remove from parallel.py.)
 
-    Args:
-        key: The XML key to validate.
 
-    Returns:
-        bool: True if the key is valid XML, False otherwise.
-    """
-    from json2xml import dicttoxml
-    return dicttoxml.key_is_valid_xml(key)
+# (Moved to dicttoxml.py. Remove from parallel.py.)
 
 
-def make_valid_xml_name_cached(key: str, attr: dict[str, Any]) -> tuple[str, dict[str, Any]]:
-    """
-    Thread-safe cached version of make_valid_xml_name.
-
-    Args:
-        key: The key to validate.
-        attr: The attributes dictionary.
-
-    Returns:
-        tuple: Valid XML key and updated attributes.
-    """
-    from json2xml import dicttoxml
-    key = dicttoxml.escape_xml(key)
-
-    if key_is_valid_xml_cached(key):
-        return key, attr
-
-    if isinstance(key, int) or key.isdigit():
-        return f"n{key}", attr
-
-    if key_is_valid_xml_cached(key.replace(" ", "_")):
-        return key.replace(" ", "_"), attr
-
-    if key_is_valid_xml_cached(key.replace(":", "").replace("@flat", "")):
-        return key, attr
-
-    attr["name"] = key
-    key = "key"
-    return key, attr
-
-
 def _convert_dict_item(
     key: str,
     val: Any,
EOF
@@ -60,52 +60,12 @@
return max(1, optimal)


@lru_cache(maxsize=None)
def key_is_valid_xml_cached(key: str) -> bool:
"""
Thread-safe cached version of key_is_valid_xml.
# (Moved to dicttoxml.py. Remove from parallel.py.)

Args:
key: The XML key to validate.

Returns:
bool: True if the key is valid XML, False otherwise.
"""
from json2xml import dicttoxml
return dicttoxml.key_is_valid_xml(key)
# (Moved to dicttoxml.py. Remove from parallel.py.)


def make_valid_xml_name_cached(key: str, attr: dict[str, Any]) -> tuple[str, dict[str, Any]]:
"""
Thread-safe cached version of make_valid_xml_name.

Args:
key: The key to validate.
attr: The attributes dictionary.

Returns:
tuple: Valid XML key and updated attributes.
"""
from json2xml import dicttoxml
key = dicttoxml.escape_xml(key)

if key_is_valid_xml_cached(key):
return key, attr

if isinstance(key, int) or key.isdigit():
return f"n{key}", attr

if key_is_valid_xml_cached(key.replace(" ", "_")):
return key.replace(" ", "_"), attr

if key_is_valid_xml_cached(key.replace(":", "").replace("@flat", "")):
return key, attr

attr["name"] = key
key = "key"
return key, attr


def _convert_dict_item(
key: str,
val: Any,
Copilot is powered by AI and may make mistakes. Always verify output.
"""
if not isinstance(obj, dict):
raise TypeError("obj must be a dict")
from json2xml import dicttoxml

Check notice

Code scanning / CodeQL

Cyclic import Note

Import of module
json2xml.dicttoxml
begins an import cycle.

Copilot Autofix

AI 4 days ago

The best practice to break cyclic imports between modules is to ensure neither module "directly" depends on the other at import time. In this case, the import of dicttoxml is only needed inside a function (convert_dict_parallel), specifically to call dicttoxml.convert_dict if the number of items is below a threshold. A straightforward way to fix this is to move the affected logic (the code for handling small dicts — i.e., the part that calls dicttoxml.convert_dict) out of parallel.py into dicttoxml.py, since conceptually, "parallel" features are extensions or variants of the main conversion logic (single-threaded dictionary conversion).

Here's the recommended approach:

  • Move the full convert_dict_parallel function from parallel.py to dicttoxml.py.
  • In parallel.py, remove the import of dicttoxml and the implementation of convert_dict_parallel.
  • In dicttoxml.py, import or otherwise reference any helper functions needed from parallel.py (such as _convert_dict_item, get_optimal_workers) for parallel operation.
    • If those helpers are required, consider moving all parallel-related helpers (such as _convert_dict_item, _convert_list_chunk) to a new submodule (json2xml.parallel_helpers or similar), then import that submodule where needed, to avoid new long-term cycles.
  • Update any usages or references accordingly.

Specific changes:

  • In parallel.py: Remove the local import (from json2xml import dicttoxml) and delete the entire convert_dict_parallel function.
  • In dicttoxml.py: Add the full convert_dict_parallel function, and import required helpers.
  • Optionally: If helpers must move to avoid new cycles, move them to a third helper module.

Since you only shared the code for parallel.py, I can only provide changes for that file. Further migration steps in dicttoxml.py can be done similarly.


Suggested changeset 1
json2xml/parallel.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/json2xml/parallel.py b/json2xml/parallel.py
--- a/json2xml/parallel.py
+++ b/json2xml/parallel.py
@@ -215,41 +215,41 @@
     Returns:
     str: XML string.
     """
-    if not isinstance(obj, dict):
-        raise TypeError("obj must be a dict")
-    from json2xml import dicttoxml
-    if len(obj) < min_items_for_parallel:
-        return dicttoxml.convert_dict(
-            obj, ids, parent, attr_type, item_func, cdata, item_wrap, list_headers
-        )
+# REMOVED: convert_dict_parallel now lives in dicttoxml.py. See dicttoxml.convert_dict_parallel for the parallel implementation.
 
-    workers = get_optimal_workers(workers)
-    items = list(obj.items())
-    results: dict[int, str] = {}
 
-    with ThreadPoolExecutor(max_workers=workers) as executor:
-        future_to_idx = {
-            executor.submit(
-                _convert_dict_item,
-                key, val, ids, parent, attr_type, item_func, cdata, item_wrap, list_headers
-            ): idx
-            for idx, (key, val) in enumerate(items)
-        }
 
-        for future in as_completed(future_to_idx):
-            idx = future_to_idx[future]
-            try:
-                results[idx] = future.result()
-            except Exception as e:
-                # Cancel remaining futures
-                for f in future_to_idx:
-                    if not f.done():
-                        f.cancel()
-                raise e
 
-    return "".join(results[idx] for idx in range(len(items)))
 
 
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
 def _convert_list_chunk(
     items: Sequence[Any],
     ids: list[str] | None,
EOF
Copilot is powered by AI and may make mistakes. Always verify output.
"""
if not isinstance(items, Sequence) or isinstance(items, (str, bytes)):
raise TypeError("items must be a sequence (not str or bytes)")
from json2xml import dicttoxml

Check notice

Code scanning / CodeQL

Cyclic import Note

Import of module
json2xml.dicttoxml
begins an import cycle.

Copilot Autofix

AI 4 days ago

To fix this cyclic import, the best approach is to break the dependency loop by decoupling convert_list_parallel from needing to import dicttoxml during execution. One common way is to shift this function (or part of its logic, or the part that needs to call dicttoxml.convert_list) into the dicttoxml module itself, so that parallel does not need to reference dicttoxml.

Given that convert_list_parallel is a parallelized adaptation of dicttoxml.convert_list, it's reasonable to move convert_list_parallel into json2xml/dicttoxml.py, then have the rest of the code call it from there. The parallel module should then define only the "parallelization utility" (e.g., chunking, thread pool logic), not convert_list_parallel itself.

Within the code you've shown:

  • Remove (or refactor out) the function convert_list_parallel from json2xml/parallel.py.
  • Move or request this function to be in dicttoxml.py instead, since dicttoxml is the only code that calls its own convert_list.
  • If parallel.py is solely a utility for threading/chunking and doesn't need to reference dicttoxml, this breaks the dependency cycle.

Suggested changeset 1
json2xml/parallel.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/json2xml/parallel.py b/json2xml/parallel.py
--- a/json2xml/parallel.py
+++ b/json2xml/parallel.py
@@ -284,66 +284,5 @@
     )
 
 
-def convert_list_parallel(
-    items: Sequence[Any],
-    ids: list[str] | None,
-    parent: str,
-    attr_type: bool,
-    item_func: Callable[[str], str],
-    cdata: bool,
-    item_wrap: bool,
-    list_headers: bool = False,
-    workers: int | None = None,
-    chunk_size: int = 100
-) -> str:
-    """
-    Parallel version of convert_list that processes list chunks concurrently.
-
-    Args:
-        items: List to convert.
-        ids: List of unique IDs.
-        parent: Parent element name.
-        attr_type: Whether to include type attributes.
-        item_func: Function to generate item names.
-        cdata: Whether to wrap strings in CDATA.
-        item_wrap: Whether to wrap list items.
-        list_headers: Whether to repeat headers for lists.
-        workers: Number of worker threads (None for auto-detect).
-        chunk_size: Number of items per chunk.
-
-    Returns:
-    str: XML string.
-    """
-    if not isinstance(items, Sequence) or isinstance(items, (str, bytes)):
-        raise TypeError("items must be a sequence (not str or bytes)")
-    from json2xml import dicttoxml
-    if len(items) < chunk_size:
-        return dicttoxml.convert_list(
-            items, ids, parent, attr_type, item_func, cdata, item_wrap, list_headers
-        )
-
-    workers = get_optimal_workers(workers)
-    chunks = [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
-    results: dict[int, str] = {}
-
-    with ThreadPoolExecutor(max_workers=workers) as executor:
-        future_to_idx = {
-            executor.submit(
-                _convert_list_chunk,
-                chunk, ids, parent, attr_type, item_func, cdata, item_wrap, list_headers, idx * chunk_size
-            ): idx
-            for idx, chunk in enumerate(chunks)
-        }
-
-        for future in as_completed(future_to_idx):
-            idx = future_to_idx[future]
-            try:
-                results[idx] = future.result()
-            except Exception as e:
-                # Cancel remaining futures
-                for f in future_to_idx:
-                    if not f.done():
-                        f.cancel()
-                raise e
-
-    return "".join(results[idx] for idx in range(len(chunks)))
+# NOTE: `convert_list_parallel` has been moved to `json2xml/dicttoxml.py` to break an import cycle.
+# Please import and use `json2xml.dicttoxml.convert_list_parallel`.
EOF
@@ -284,66 +284,5 @@
)


def convert_list_parallel(
items: Sequence[Any],
ids: list[str] | None,
parent: str,
attr_type: bool,
item_func: Callable[[str], str],
cdata: bool,
item_wrap: bool,
list_headers: bool = False,
workers: int | None = None,
chunk_size: int = 100
) -> str:
"""
Parallel version of convert_list that processes list chunks concurrently.

Args:
items: List to convert.
ids: List of unique IDs.
parent: Parent element name.
attr_type: Whether to include type attributes.
item_func: Function to generate item names.
cdata: Whether to wrap strings in CDATA.
item_wrap: Whether to wrap list items.
list_headers: Whether to repeat headers for lists.
workers: Number of worker threads (None for auto-detect).
chunk_size: Number of items per chunk.

Returns:
str: XML string.
"""
if not isinstance(items, Sequence) or isinstance(items, (str, bytes)):
raise TypeError("items must be a sequence (not str or bytes)")
from json2xml import dicttoxml
if len(items) < chunk_size:
return dicttoxml.convert_list(
items, ids, parent, attr_type, item_func, cdata, item_wrap, list_headers
)

workers = get_optimal_workers(workers)
chunks = [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
results: dict[int, str] = {}

with ThreadPoolExecutor(max_workers=workers) as executor:
future_to_idx = {
executor.submit(
_convert_list_chunk,
chunk, ids, parent, attr_type, item_func, cdata, item_wrap, list_headers, idx * chunk_size
): idx
for idx, chunk in enumerate(chunks)
}

for future in as_completed(future_to_idx):
idx = future_to_idx[future]
try:
results[idx] = future.result()
except Exception as e:
# Cancel remaining futures
for f in future_to_idx:
if not f.done():
f.cancel()
raise e

return "".join(results[idx] for idx in range(len(chunks)))
# NOTE: `convert_list_parallel` has been moved to `json2xml/dicttoxml.py` to break an import cycle.
# Please import and use `json2xml.dicttoxml.convert_list_parallel`.
Copilot is powered by AI and may make mistakes. Always verify output.
vinitkumar and others added 6 commits November 3, 2025 15:25
…d code

- Remove unused _dispatch_convert function from dicttoxml.py
- Add tests for parallel processing with cdata and xml_namespaces
- Add test for exception handling in is_free_threaded
- Add test for unsupported types in parallel list processing
- Fix type checker issues using cast instead of ignore

Amp-Thread-ID: https://ampcode.com/threads/T-17f644e1-8fd3-4bb2-b2c6-bcb9119cfc98
Co-authored-by: Amp <[email protected]>
- Change minimum Python version to 3.13
- Remove support for Python 3.10-3.12 and PyPy
- Update CI matrix to test 3.13t, 3.14, 3.14t, 3.15.0-alpha.1
- Update classifiers to reflect supported versions
- Upgrade code syntax with pyupgrade for 3.13+
- Update lint job to use Python 3.13
- Update AGENT.md documentation

Amp-Thread-ID: https://ampcode.com/threads/T-df26d9ea-141e-401a-b305-46bb4870f559
Co-authored-by: Amp <[email protected]>
@vinitkumar vinitkumar changed the base branch from master to master-freethreaded November 3, 2025 11:55
vinitkumar and others added 4 commits November 3, 2025 17:35
…port sorting

- Update pytest from 7.0.1 to >=8.4.1 for Python 3.14 compatibility (fixes ast.Str removal)
- Add pytest-xdist and pytest-cov to dependencies with proper version constraints
- Fix import sorting in test_parallel.py (ruff I001)
- All tests passing (199 tests, 99% coverage)
- All linting and type checks passing
- Ensure both ruff and ty run in main CI workflow
- Matches lint.yml workflow which has both checks
- All code quality checks now run automatically
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants